package com.atguigu.day03;

import com.atguigu.utils.ClickEvent;
import com.atguigu.utils.ClickSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

// ProcessFunction实现FlatMapFunction的功能
public class Example12 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new ClickSource())
                .process(new ProcessFunction<ClickEvent, ClickEvent>() {
                    @Override
                    public void processElement(ClickEvent in, Context ctx, Collector<ClickEvent> out) throws Exception {
                        if (in.username.equals("Mary")) {
                            for (int i = 0; i < 2; i++) {
                                out.collect(in);
                            }
                        } else if (in.username.equals("Bob")) {
                            out.collect(in);
                        }
                    }
                })
                .print();

        env.execute();
    }
}
